package org.example.api;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.example.dao.MessageDao;
import org.example.dao.UserDao;
import org.example.model.Message;
import org.example.model.User;
import org.example.util.WebUtil;
import org.example.util.WebsocketConfigurator;

import javax.servlet.http.HttpSession;
import javax.servlet.http.Part;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

@ServerEndpoint(value="/message", configurator = WebsocketConfigurator.class)
public class MessageEndpoint {
    //当前客户端websocket的会话
    private Session session;
    //当前登录的用户对象
    private User loginUser;

//    private static final ObjectMapper M = new ObjectMapper();
    //使用一个共享的数据结构来保存所有客户端websocket会话
//    private static List<Session> onlineUsers = new ArrayList<>();
//    private static Map<Integer,Session> onlineUsers = new HashMap<>();
    //使用concurrentHashMap实现多线程安全
    private static Map<Integer, Session> onlineUsers = new ConcurrentHashMap<>();

    //无边界阻塞队列
    private static BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();

    //消费消息：创建一个或多个线程，从消息队列中一个一个拿，每个都转发到所有在线用户
    //线程只能创建一次
    static {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        Message m = messageQueue.take();
                        for(Session session: onlineUsers.values()){
                            String json = WebUtil.write(m);
                            session.getBasicRemote().sendText(json);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    //session对象，是建立连接的客户端websocket的会话（建立连接到关闭连接就是某个客户端的一次会话）
    //这个session和登录时，使用HttpSession是不同，
    // 但可以通过配置里边，先使用websocket session保存httpSession，
    // 然后建立连接时获取到httpSession
    @OnOpen
    public void onOpen(Session session) throws IOException {
        //验证下是否登录，踢除已登录的用户
        //保存session到成员变量，后面的事件方法可以使用session来获取信息，发送消息
        //使用websocket session获取httpsession
        HttpSession httpSession = (HttpSession) session.getUserProperties().get("HttpSession");

        //校验：是否已经登录
        User user = WebUtil.getLoginUser(httpSession);
        if(user == null){
            CloseReason reason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,
                    "尚未登录，请登录后再使用");
            session.close(reason);
            return;
        }
        this.loginUser = user;
        //踢掉使用相同账号的登录的用户：找到相同用户的session，然后关闭websocket session
        Session preSession = onlineUsers.get(user.getId());
        if(onlineUsers.containsKey(user.getId())){//所有key中，是否包含用户id
            //相同账号重复登录：踢出上个登陆的用户
            CloseReason reason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,
                    "账号在别处登录");
            preSession.close(reason);
            return;
        }
        this.session = session;
        //后续服务端收到一个消息时，还需要把消息发送给所有在线用户，需要保存所有客户端会话
        onlineUsers.put(user.getId(),session);
        System.out.println("服务器建立连接");

        //建立websocket连接之后，客户端需要接受所有的历史消息
        List<Message> messages = MessageDao.query(user.getLogoutTime());
        for(Message m:messages) { //遍历历史消息，全发送到当前用户的websocket
            String json = WebUtil.write(m);
            session.getBasicRemote().sendText(json);
        }
    }

    @OnClose
    public void onClose(){
        System.out.println("断开连接");
        //关闭连接：删除map中的当前会话，记录当前用户上次注销时间
        onlineUsers.remove(loginUser.getId());
        loginUser.setLogoutTime(new java.util.Date());
        int n = UserDao.updateLogoutTime(loginUser);

    }

    @OnError
    public void onError(Throwable t){
        t.printStackTrace();
        onlineUsers.remove(loginUser.getId());
    }

    //服务端接收消息
    @OnMessage
    public void onMessage(String message) throws IOException {
        //将接收到的json字符串消息，转换为message对象
        Message m = WebUtil.read(message,Message.class);
        //其他字段，从保存的user中获取
        //安全：前端可以通过一些手段修改传输的数据
        m.setUserId(loginUser.getId());
        m.setUserNickname(loginUser.getNickname());

        System.out.println("服务端接收到消息"+message);
        //给所有在线用户发送消息（消息推送，服务器发）
//        for(Session session:onlineUsers.values()){
//            int n = MessageDao.insert(m);
//            String json = WebUtil.write(m);
//            session.getBasicRemote().sendText(json);
//        }
        //需要保存消息到数据库，没有在线用户，下次登录还要查看消息
        //BlockingQueue的结构来异步的发送消息
        int n = MessageDao.insert(m);
        try{
            messageQueue.put(m);
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }


}
